Skip to content

fix(ai): support async with on async streaming responses (Fixes #393)#645

Merged
turnipdabeets merged 6 commits into
mainfrom
fix/async-stream-context-manager-v2
Jun 3, 2026
Merged

fix(ai): support async with on async streaming responses (Fixes #393)#645
turnipdabeets merged 6 commits into
mainfrom
fix/async-stream-context-manager-v2

Conversation

@turnipdabeets
Copy link
Copy Markdown
Contributor

@turnipdabeets turnipdabeets commented Jun 3, 2026

💡 Motivation and Context

Fixes #393. Supersedes #622 (rebased onto current main, with @marandaneto's review addressed).

The async AI wrappers returned a bare async generator for streaming responses, which supports async for but not async with. Libraries like pydantic-ai consume streams via async with response:, so they broke with:

TypeError: 'async_generator' object does not support the asynchronous context manager protocol

This adds an AsyncStreamWrapper that supports async with (closing the underlying provider stream on exit) while preserving async for and await response.aclose(). Applied to the OpenAI, Anthropic, and Gemini async streaming wrappers.

Out of scope: Anthropic messages.stream() (still async def) and the sync with gap — left for a follow-up.

💚 How did you test it?

Added unit tests for the wrapper (async with, async for, early-exit capture, provider-stream close, aclose, exception propagation) and real-client regression tests covering async with for OpenAI chat/responses, Anthropic messages.create, and Gemini streaming. Confirmed the tests reproduce the #393 TypeError without the fix. Full AI test suite passes; ruff and mypy clean.

📝 Checklist

  • I reviewed the submitted code.
  • I added tests to verify the changes.
  • I updated the docs if needed.
  • No breaking change or entry added to the changelog.

If releasing new changes

  • Ran sampo add to generate a changeset file

The async AI wrappers returned a bare async generator for streaming
responses. Async generators support `async for` but not `async with`,
so libraries that enter the stream as a context manager (e.g.
pydantic-ai's `async with response:`) raised:

    TypeError: 'async_generator' object does not support the
    asynchronous context manager protocol

Add `AsyncStreamWrapper`, which wraps the PostHog tracking generator and
adds the async context manager protocol. On `__aexit__` it closes the
tracking generator (so the `finally` block that fires the PostHog usage
event always runs, even on early exit) and then closes the underlying
provider stream to release the HTTP connection, matching native SDK
behaviour. Attribute access is proxied to the provider stream so
metadata such as `.response` keeps working.

Applied to OpenAI chat completions, OpenAI responses, and Anthropic
messages streaming. Adds wrapper unit tests plus real-client regression
tests covering `async with` and early-exit stream closure.

Supersedes #622.

Co-authored-by: devteamaegis <devteam.aegis@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

posthog-python Compliance Report

Date: 2026-06-03 03:26:51 UTC
Duration: 175971ms

✅ All Tests Passed!

45/45 tests passed


Capture Tests

29/29 tests passed

View Details
Test Status Duration
Format Validation.Event Has Required Fields 513ms
Format Validation.Event Has Uuid 1507ms
Format Validation.Event Has Lib Properties 1505ms
Format Validation.Distinct Id Is String 1506ms
Format Validation.Token Is Present 1506ms
Format Validation.Custom Properties Preserved 1506ms
Format Validation.Event Has Timestamp 1506ms
Retry Behavior.Retries On 503 9513ms
Retry Behavior.Does Not Retry On 400 3509ms
Retry Behavior.Does Not Retry On 401 3505ms
Retry Behavior.Respects Retry After Header 9514ms
Retry Behavior.Implements Backoff 23515ms
Retry Behavior.Retries On 500 7510ms
Retry Behavior.Retries On 502 7515ms
Retry Behavior.Retries On 504 7506ms
Retry Behavior.Max Retries Respected 23528ms
Deduplication.Generates Unique Uuids 1498ms
Deduplication.Preserves Uuid On Retry 7513ms
Deduplication.Preserves Uuid And Timestamp On Retry 14513ms
Deduplication.Preserves Uuid And Timestamp On Batch Retry 7512ms
Deduplication.No Duplicate Events In Batch 1503ms
Deduplication.Different Events Have Different Uuids 1506ms
Compression.Sends Gzip When Enabled 1506ms
Batch Format.Uses Proper Batch Structure 1506ms
Batch Format.Flush With No Events Sends Nothing 1004ms
Batch Format.Multiple Events Batched Together 1505ms
Error Handling.Does Not Retry On 403 3508ms
Error Handling.Does Not Retry On 413 3506ms
Error Handling.Retries On 408 7513ms

Feature_Flags Tests

16/16 tests passed

View Details
Test Status Duration
Request Payload.Request With Person Properties Device Id 1002ms
Request Payload.Flags Request Uses V2 Query Param 1005ms
Request Payload.Flags Request Hits Flags Path Not Decide 1006ms
Request Payload.Flags Request Omits Authorization Header 1006ms
Request Payload.Token In Flags Body Matches Init 1006ms
Request Payload.Groups Round Trip 1006ms
Request Payload.Groups Default To Empty Object 1005ms
Request Payload.Person Properties Distinct Id Auto Populated When Caller Omits It 1006ms
Request Payload.Disable Geoip False Propagates As Geoip Disable False 1006ms
Request Payload.Disable Geoip Omitted Defaults To False 1006ms
Request Payload.Flag Keys To Evaluate Contains Only Requested Key 1005ms
Request Lifecycle.No Flags Request On Init Alone 503ms
Request Lifecycle.No Flags Request On Normal Capture 1506ms
Request Lifecycle.Two Flag Calls Produce Two Remote Requests 1010ms
Request Lifecycle.Mock Response Value Is Returned To Caller 1002ms
Side Effect Events.Get Feature Flag Captures Feature Flag Called Event 1508ms

mypy flagged stream.py:49 because T was used across __init__ and
__anext__ without the class being parameterized, so they resolved to
unrelated typevars. Declare the class Generic[T] (it is genuinely
generic over the chunk type) and carry [T] through the self-returning
annotations.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
posthog/ai/stream.py:58-75
If `self._generator.aclose()` raises — for example because `_capture_streaming_event` inside the tracking generator's `finally` block throws a network error — the provider stream's `close()` is never reached. This leaves the underlying HTTP connection open until GC collects the stream, which can exhaust connection-pool slots under load. Wrapping in `try/finally` guarantees `_stream.close()` always runs regardless of what `aclose()` does.

```suggestion
    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
        # Close our tracking generator first so its `finally` block runs and
        # the PostHog usage event is captured, even when the caller breaks out
        # of the loop early. If it is already exhausted this is a no-op.
        try:
            await self._generator.aclose()
        finally:
            # Then close the underlying provider stream to release the HTTP
            # connection, matching native SDK behaviour. Provider streams expose an
            # async `close()`; bare async generators (e.g. in tests) expose
            # `aclose()`.
            if self._stream is not None:
                close = getattr(self._stream, "aclose", None) or getattr(
                    self._stream, "close", None
                )
                if close is not None:
                    await close()

        return False
```

### Issue 2 of 2
posthog/test/ai/test_async_stream_wrapper.py:8-25
`RecordingStream` here, `_RecordingAsyncStream` in `test_openai.py`, and `_RecordingAnthropicStream` in `test_anthropic.py` are structurally identical — each has `_items`, `closed`, `response = "provider-response"`, `__aiter__`/`__anext__`, and `async def close()`. Moving a single shared fixture (e.g. into a `conftest.py` or a small `posthog/test/ai/utils.py`) would remove the duplication and make any future changes a one-line update.

Reviews (1): Last reviewed commit: "fix(ai): make AsyncStreamWrapper Generic..." | Re-trigger Greptile

Comment thread posthog/ai/stream.py
Comment thread posthog/test/ai/test_async_stream_wrapper.py Outdated
- Wrap the generator close in try/finally so the underlying provider
  stream is always closed even if the PostHog capture in the generator's
  finally raises (avoids leaking the HTTP connection).
- Extract the duplicated mock stream into a shared
  posthog/test/ai/utils.py::RecordingAsyncStream used by all three test
  modules.
- Trim verbose comments/docstrings.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Both fixes look correct:

  1. Connection leak — the try/finally in __aexit__ correctly ensures self._stream is always closed even if self._generator.aclose() raises. The close method resolution (acloseclose fallback) is a nice touch for handling both sync and async close APIs.

  2. Shared mockRecordingAsyncStream in utils.py is clean and complete — covers iteration, close(), and the .response attribute that tests the __getattr__ proxy.

One small note: __aexit__ returns False explicitly, which is fine and correct (doesn't suppress exceptions). LGTM — changes are solid.

Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.

turnipdabeets and others added 3 commits June 2, 2026 23:02
…getattr__

Guard __getattr__ against names starting with '_'. Prevents infinite
recursion if the instance is built without __init__ (e.g. copy/pickle)
and stops hasattr/copy probes from leaking to the provider stream. Only
public metadata like .response is proxied.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review findings on #645:

- Forward async-generator protocol methods (aclose/asend/athrow) to the
  tracking generator instead of the provider stream. After pointing
  __getattr__ at the provider stream (which exposes close(), not
  aclose()), `await response.aclose()` regressed to AttributeError and
  dropped the $ai_generation event. Forwarding to the generator restores
  the pre-wrapper behaviour (aclose runs the finally, fires the event).
- Wrap async Gemini streaming (gemini_async.py) in AsyncStreamWrapper —
  it had the identical `async with` gap.
- Pass the provider stream by keyword (stream=response) at all call sites.
- Tests: aclose-fires-event, try/finally closes provider stream when the
  capture raises, exception propagation through __aexit__, payload
  assertions on the async-with path, and a Gemini async-with regression.
- Changeset: note the return-type change and that sync wrappers are
  unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Simplify the changeset to a plain user-facing entry (drop the behavior
note). Remove redundant test docstrings whose names already say it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@turnipdabeets turnipdabeets marked this pull request as ready for review June 3, 2026 03:25
@turnipdabeets turnipdabeets requested a review from a team June 3, 2026 03:27
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
posthog/ai/stream.py:55-62
**`aclose()` bypasses provider-stream cleanup**

When a caller does `await response.aclose()` directly (without `async with`), `__getattr__` forwards to `self._generator.aclose()`, which fires the PostHog capture event in the generator's `finally` block — but `self._stream.close()` is never called. The underlying HTTP connection is left open until GC. The `test_aclose_runs_generator_finally_and_captures` test verifies the event fires but intentionally does not assert `source.closed is True`, confirming this gap. Overriding `aclose()` as a proper method (mirroring `__aexit__`'s `try/finally`) would close the provider stream on any teardown path, not only `async with` exit.

Reviews (2): Last reviewed commit: "chore(ai): simplify changeset, trim comm..." | Re-trigger Greptile

@turnipdabeets turnipdabeets merged commit 44e6b14 into main Jun 3, 2026
31 checks passed
@turnipdabeets turnipdabeets deleted the fix/async-stream-context-manager-v2 branch June 3, 2026 13:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PostHog with pydantic-ai streaming raises TypeError: 'async_generator'

2 participants